# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import sympy as sm
from hysop.constants import HYSOP_REAL
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numerics import is_signed, is_unsigned, is_fp, is_complex
from hysop.tools.sympy_utils import subscripts, SymbolicBase
from hysop.parameters.parameter import Parameter
[docs]
class TensorParameter(Parameter):
"""
A tensor parameter is np.ndarray of a given dtype and shape
that may change value as simulation advances.
"""
[docs]
def __new__(
cls,
name,
shape,
dtype=HYSOP_REAL,
pretty_name=None,
initial_value=None,
min_value=None,
max_value=None,
ignore_nans=False,
**kwds,
):
"""
Create or get an existing TensorParameter with a specific name
shape and dtype.
Parameters
----------
name: string
A name for the parameter that uniquely identifies it.
pretty_name: string
A pretty name for the parameter.
shape: array like of ints
Shape of this TensorParameter.
dtype: type convertible to np.dtype, optional
Underlying dtype of this TensorParameter.
Defaults to HYSOP_REAL.
initial_value: optional, defaults to an uninitialized array.
Initial value as a scalar or np.ndarray.
If not initial value is given, all values initialized to 0.
min_value: scalar, optional, defaults to None
Minimum value allowed for this parameter.
max_value: scalar, optional, defaults to None
Maximum value allowed for this parameter.
ignore_nans: bool
Set this to True to allow NaN values.
kwds: dict
Base class arguments.
Attributes
----------
shape: array like of ints
Shape of this TensorParameter.
dtype: type convertible to np.dtype
Underlying dtype of this TensorParameter.
min_value: scalar, optional, defaults to None
Minimum value allowed for this TensorParameter.
max_value: scalar, optional, defaults to None
Maximum value allowed for this TensorParameter.
ignore_nans: bool
True if this TensorParameter can have NaN values.
"""
if ("allow_None" in kwds) and (kwds["allow_None"] is True):
msg = "A TensorParameter cannot be allowed to be set to None."
raise ValueError(msg)
check_instance(name, (str, SymbolicBase))
if isinstance(name, SymbolicBase):
symbol = name
name = symbol._name
pretty_name = symbol._pretty_name
else:
symbol = None
pretty_name = first_not_None(pretty_name, name)
check_instance(name, str)
check_instance(pretty_name, str)
check_instance(shape, (list, tuple), values=(int, np.integer), allow_none=True)
check_instance(ignore_nans, bool)
assert (min_value is None) or (max_value is None) or (min_value <= max_value)
parameter_types = (np.ndarray,)
if is_signed(dtype):
parameter_types += (np.int8, np.int16, np.int32, np.int64, int)
elif is_unsigned(dtype):
parameter_types += (np.uint8, np.uint16, np.uint32, np.uint64)
elif is_fp(dtype):
parameter_types += (
np.float16,
np.float32,
np.float64,
np.longdouble,
float,
)
elif is_complex(dtype):
parameter_types += (np.complex64, np.complex128, np.clongdouble, complex)
initial_value = cls._compute_initial_value(
shape, dtype, initial_value, min_value, max_value, ignore_nans
)
obj = super().__new__(
cls,
name=name,
pretty_name=pretty_name,
parameter_types=parameter_types,
allow_None=False,
initial_value=initial_value,
**kwds,
)
obj._min_value = min_value
obj._max_value = max_value
obj._ignore_nans = ignore_nans
from hysop.symbolic.parameter import (
SymbolicTensorParameter,
SymbolicScalarParameter,
)
if obj.__class__ is TensorParameter:
if symbol:
check_instance(symbol, SymbolicTensorParameter)
obj._symbol = symbol
else:
obj._symbol = SymbolicTensorParameter(parameter=obj)
else:
if symbol:
check_instance(symbol, sm.Symbol)
obj._symbol = symbol
else:
obj._symbol = SymbolicScalarParameter(parameter=obj)
return obj
def __init__(
self,
name,
shape,
dtype=HYSOP_REAL,
pretty_name=None,
initial_value=None,
min_value=None,
max_value=None,
ignore_nans=False,
**kwds,
):
super().__init__(
name=name,
pretty_name=pretty_name,
parameter_types=None,
allow_None=False,
initial_value=initial_value,
**kwds,
)
@classmethod
def _compute_initial_value(
cls,
shape,
dtype,
initial_value,
min_value=None,
max_value=None,
ignore_nans=None,
):
if not isinstance(dtype, np.dtype):
dtype = np.dtype(dtype)
if isinstance(initial_value, (tuple, list)):
initial_value = np.asarray(initial_value, dtype=dtype)
if isinstance(initial_value, np.ndarray):
check_instance(initial_value, np.ndarray)
assert initial_value.dtype == dtype, (initial_value.dtype, dtype)
assert initial_value.shape == shape, (initial_value.shape, shape)
cls.__check_values(
a=initial_value,
dtype=dtype,
min_value=min_value,
max_value=max_value,
ignore_nans=ignore_nans,
)
elif np.isscalar(initial_value):
assert shape is not None
assert (min_value is None) or (initial_value >= min_value)
assert (max_value is None) or (initial_value <= max_value)
initial_value = np.full(shape=shape, dtype=dtype, fill_value=initial_value)
elif initial_value is None:
if shape is None:
initial_value = None
else:
initial_value = np.zeros(shape=shape, dtype=dtype)
else:
msg = "Unknown initial_value type {}"
msg = msg.format(type(initial_value))
raise TypeError(msg)
return initial_value
[docs]
def reallocate_tensor(self, shape, dtype, initial_value=None):
self._value = self._compute_initial_value(
shape,
dtype,
initial_value,
min_value=self.min_value,
max_value=self.max_value,
ignore_nans=self.ignore_nans,
)
self._symbol = self._update_symbol()
def _update_symbol(self):
raise NotImplementedError
[docs]
def view(self, idx, name=None, pretty_name=None, **kwds):
"""Take a view on a scalar contained in the Parameter."""
assert self._value is not None
initial_value = self._value[tuple(slice(k, k + 1) for k in idx)]
_name = self.name + "_" + "_".join(str(i) for i in idx)
_pretty_name = self.pretty_name + subscripts(ids=idx, sep="")
name = first_not_None(name, _name)
pretty_name = first_not_None(pretty_name, _pretty_name)
if initial_value.size == 1:
from hysop.parameters.scalar_parameter import ScalarParameter
return ScalarParameter(
name=name,
pretty_name=pretty_name,
initial_value=initial_value.ravel(),
dtype=self.dtype,
min_value=self.min_value,
max_value=self.max_value,
ignore_nans=self.ignore_nans,
const=self.const,
quiet=self.quiet,
is_view=True,
**kwds,
)
else:
return TensorParameter(
name=name,
pretty_name=pretty_name,
initial_value=initial_value,
dtype=self.dtype,
shape=initial_value.shape,
min_value=self.min_value,
max_value=self.max_value,
ignore_nans=self.ignore_nans,
const=self.const,
quiet=self.quiet,
is_view=True,
**kwds,
)
[docs]
def iterviews(self):
"""Iterate over all parameters views to yield scalarparameters."""
for idx in np.ndindex(self.shape):
yield (idx, self.view(idx))
@classmethod
def __check_values(cls, a, min_value, max_value, ignore_nans, dtype):
if np.isscalar(a):
a = np.asarray([a], dtype=dtype)
assert (a is not None) and isinstance(a, np.ndarray)
if ignore_nans:
if min_value is not None:
assert np.all(np.nanmin(a) >= min_value), "min value constraint failed."
if max_value is not None:
assert np.all(np.nanmax(a) <= max_value), "max value constraint failed."
else:
if np.any(np.isnan(a)):
msg = "Given value contains NaNs:\n{}\n"
msg = msg.format(a)
raise ValueError(msg)
if min_value is not None:
assert np.all(np.min(a) >= min_value), "min value constraint failed."
if max_value is not None:
assert np.all(np.max(a) <= max_value), "max value constraint failed."
[docs]
def check_values(self, a):
return self.__check_values(
a,
dtype=self.dtype,
min_value=self.min_value,
max_value=self.max_value,
ignore_nans=self.ignore_nans,
)
def _get_shape(self):
"""Get parameter shape."""
return self._value.shape if (self._value is not None) else None
def _get_size(self):
"""Get parameter size."""
return self._value.size if (self._value is not None) else 0
def _get_dtype(self):
"""Get parameter dtype."""
assert self._value is not None
return self._value.dtype if (self._value is not None) else None
def _get_ctype(self):
"""Get the data type of the discrete field as a C type."""
from hysop.backend.device.codegen.base.variables import dtype_to_ctype
dtype = self.dtype
return dtype_to_ctype(dtype)
def _get_min_value(self):
"""Return minimum value allowed for array values."""
return self._min_value
def _get_max_value(self):
"""Return maximum value allowed for array values."""
return self._max_value
def _get_ignore_nans(self):
"""Return True if array can have NaNs."""
return self._ignore_nans
def _get_value_impl(self):
"""Return a read-only reference on the underlying data buffer."""
assert self._value is not None
view = self._value.view()
view.flags.writeable = False
return view.view()
def _get_tensor_value(self):
"""
Get a read-only view on this array parameter but always as a numpy array,
even for ScalarParameter parameters.
"""
return self._value.copy()
def _set_value_impl(self, value):
"""Given value will be copied into internal buffer."""
assert self._value is not None
assert (value is not None) and isinstance(value, np.ndarray)
if value.shape != self.shape:
msg = "Parameter shape mismatch, expected {} but got {}."
msg = msg.format(self.shape, value.shape)
raise ValueError(msg)
if value.dtype != self.dtype:
msg = "Parameter dtype mismatch, expected {} but got {}."
msg = msg.format(self.dtype, value.dtype)
raise ValueError(msg)
if (
(self.min_value is not None)
or (self.max_value is not None)
or (self.ignore_nans is False)
):
self.check_values(value)
self._value[...] = value
[docs]
def __getitem__(self, slices):
"""Get a read-only subview on this array parameter."""
assert self._value is not None
view = self._value.__getitem__(slices)
if np.isscalar(view):
return view
view.flags.writeable = False
return view.view()
[docs]
def __setitem__(self, slices, value):
"""Set a subview on this array parameter to a certain value (value can be broadcasted)."""
assert self._value is not None
self.check_values(value)
self._value.__setitem__(slices, value)
[docs]
def long_description(self):
ss = """\
TensorParameter[name={}, pname={}]
*shape: {}
*dtype: {}
*min_value: {}
*max_value: {}
*ignore_nans: {}
*value:\n{}
""".format(
self.name,
self.pretty_name,
self.shape,
self.dtype,
self.min_value,
self.max_value,
self.ignore_nans,
self.value,
)
return ss
[docs]
def short_description(self):
attrs = ("name", "pretty_name", "shape", "dtype", "min_value", "max_value")
info = []
for attr in attrs:
val = getattr(self, attr)
if val is not None:
info.append(f"{attr}={val}")
attrs = ", ".join(info)
ss = "TensorParameter[{}]"
ss = ss.format(attrs)
return ss
shape = property(_get_shape)
size = property(_get_size)
dtype = property(_get_dtype)
ctype = property(_get_ctype)
min_value = property(_get_min_value)
max_value = property(_get_max_value)
ignore_nans = property(_get_ignore_nans)
tensor_value = property(_get_tensor_value)